package com.zhangc.blog.base.batchtask;

import java.util.List;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.google.common.collect.Lists;

@Service
public class BatchService {
    private static final Logger logger = LoggerFactory.getLogger(BatchService.class);
    private static final ExecutorService DEFAULT_POOL = new ThreadPoolExecutor(30, 300, 60, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(1024), new ThreadPoolExecutor.CallerRunsPolicy());

    @Autowired
    private BatchFactory batchFactory;

    public <I, O> List<O> batchTransformData(Set<String> keySet, BatchContext<I, O> batchContext) {
        List<Set<String>> keySetList = Lists.newArrayList();
        keySetList.add(keySet);
        return batchTransformData(keySetList, batchContext);
    }

    public <I, O> List<O> batchTransformData(List<Set<String>> keySetList, BatchContext<I, O> batchContext) {
        BatchBean batchBean = new BatchBean();

        List<I> batchList = batchContext.getBatchList();
        // 注入参数
        for (I input : batchList) {
            batchContext.getPropertySetter().apply(input, batchBean);
        }
        // 根据参数去获取对应的批量信息,比如根据商品编码获取商品信息
        // 同时解决信息依赖的问题, 比如商品图片需要根据供应商编码, 供应商编码需要根据门店和商品编码获取,这时就产生依赖关系
        // 这时 在同步的情况下由两种解决方法 keyset 中的key值保持有序关系, 或者两个keyset 保持有序关系写入集合
        // 异步情况下 推荐使用两个keyset 保持有序关系写入集合 同个keyset中的元素无依赖关系可以支持并发执行, 多个keyset之间有序执行,保证页面查询效率
        if (batchContext.isAysn()) {
            doBatchEventAysn(keySetList, batchBean, batchContext.getExecutorService(), batchContext.getTimeout());
        } else {
            doBatchEvent(keySetList, batchBean);
        }
        List<O> output = Lists.newArrayList();
        // 赋值
        for (I input : batchList) {
            output.add(batchContext.getBatchDataHandle().transform(input, batchBean));
        }
        return output;
    }

    public void doBatchEventAysn(List<Set<String>> keySetList, BatchBean batchBean, ExecutorService executorService, int timeout) {
        for (Set<String> keySet : keySetList) {
            doBatchEventAysn(keySet, batchBean, executorService, timeout);
        }
    }

    public void doBatchEventAysn(Set<String> keySet, BatchBean batchBean, ExecutorService executorService, int timeout) {
        ExecutorService newExecutorService = executorService == null ? DEFAULT_POOL : executorService;
        List<Future> futureList = Lists.newArrayList();
        for (String key : keySet) {
            futureList.add(newExecutorService.submit(new Runnable() {
                @Override
                public void run() {
                    batchFactory.getBatchConvert(key).apply(batchBean);
                }
            }));
        }
        for (Future future : futureList) {
            try {
                future.get(timeout, TimeUnit.SECONDS);
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                logger.error("批量处理线程池发生异常", e);
            }
        }
    }

    public void doBatchEvent(List<Set<String>> keySetList, BatchBean batchBean) {
        for (Set<String> keySet : keySetList) {
            doBatchEvent(keySet, batchBean);
        }
    }

    public void doBatchEvent(Set<String> keySet, BatchBean batchBean) {
        for (String key : keySet) {
            batchFactory.getBatchConvert(key).apply(batchBean);
        }
    }
}
